Skip to content

Conversation

@muhamadazmy
Copy link
Contributor

@muhamadazmy muhamadazmy commented Oct 28, 2025

stream: add ChunksTimeout::into_remainder

Summary:
When the underlying stream is an exclusive reference (&mut stream), and we need to drop the ChunksTimout stream without losing the buffered items.

@muhamadazmy
Copy link
Contributor Author

muhamadazmy commented Oct 28, 2025

Use case

use std::time::Duration;

use tokio::sync::mpsc;
use tokio_stream::{StreamExt, wrappers::ReceiverStream};

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel::<u64>(20);

    let mut receiver = ReceiverStream::new(rx);

    let chunked = (&mut receiver).chunks_timeout(3, Duration::from_secs(3));
    tokio::pin!(chunked);

    tx.send(10).await.unwrap();
    tx.send(20).await.unwrap();

    tokio::select! {
        Some(batch) = chunked.next() => {
            println!("Got: {:?}", batch);
        }
        _ = tokio::time::sleep(Duration::from_secs(1)) => {
            // another condition
        }
    }

    let remainder = chunked.into_remainder();

    // use `receiver` here
}

@ADD-SP ADD-SP added A-tokio Area: The main tokio crate A-tokio-stream Area: The tokio-stream crate and removed A-tokio Area: The main tokio crate labels Oct 28, 2025
@muhamadazmy muhamadazmy force-pushed the pr7715 branch 3 times, most recently from aaa757e to 0fb2203 Compare October 30, 2025 13:51
Copy link
Member

@ADD-SP ADD-SP left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a similar interface in the standard library: std::slice::ChunksExactMut:into_remainder. This might be a better interface for your requirements. What do you think?

@ADD-SP ADD-SP added the S-waiting-on-author Status: awaiting some action (such as code changes) from the PR or issue author. label Nov 5, 2025
@muhamadazmy
Copy link
Contributor Author

muhamadazmy commented Nov 5, 2025

Thank you @ADD-SP for your review. The original plan was to completely consume the stream indeed and just return the reminder. This however won't work with the Pinned ChunksTimeout stream which is required when polling next() in a select statement.

This means an into_reminder won't actually be useful because the only way for this to be useful is if the ChunksTimeout stream is not pinned. And in that case, the reminder will always guaranteed to be empty since there is no way to interrupt a next().await call (unless you are select statement which requires the stream to be pinned)

I will still rename the method to reminder instead of remaining to match the std interface.

@muhamadazmy muhamadazmy force-pushed the pr7715 branch 2 times, most recently from f2c8ed1 to 4187d7b Compare November 5, 2025 08:19
@muhamadazmy muhamadazmy changed the title [ChunksTimeout] Consumes the stream and return the buffered items immediately [ChunksTimeout] Consumes the stream reminder and return the items immediately Nov 5, 2025
@ADD-SP ADD-SP removed the S-waiting-on-author Status: awaiting some action (such as code changes) from the PR or issue author. label Nov 5, 2025
@ADD-SP ADD-SP added the S-waiting-on-author Status: awaiting some action (such as code changes) from the PR or issue author. label Nov 5, 2025
@muhamadazmy
Copy link
Contributor Author

Thank you so much @ADD-SP for your review. I applied all your comments. Let me know if you have more comments.

@martin-g
Copy link
Member

martin-g commented Nov 5, 2025

I think you could add the code example from #7715 (comment) as a snippet in the rustdoc for the new method.

Some new tests would be nice too!

@ADD-SP
Copy link
Member

ADD-SP commented Nov 5, 2025

I think you could add the code example from #7715 (comment) as a snippet in the rustdoc for the new method.

Some new tests would be nice too!

@martin-g That example seems too long, and it’s not easy for downstream readers to grasp it in a few seconds. I’m okay with adding an example, but if we do, it should be simpler than the existing one.

Copy link
Member

@ADD-SP ADD-SP left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, like what @martin-g said, please add tests for this new feature.

@ADD-SP ADD-SP changed the title [ChunksTimeout] Consumes the stream reminder and return the items immediately stream: add ChunksTimeout::into_reminder Nov 5, 2025
@ADD-SP ADD-SP changed the title stream: add ChunksTimeout::into_reminder stream: add ChunksTimeout::into_remainder Nov 5, 2025
@muhamadazmy muhamadazmy changed the title stream: add ChunksTimeout::into_remainder [ChunksTimeout] Consumes the stream reminder and return the items immediately Nov 5, 2025
@muhamadazmy muhamadazmy force-pushed the pr7715 branch 2 times, most recently from d09dc64 to fa42216 Compare November 5, 2025 17:35
@muhamadazmy muhamadazmy requested a review from ADD-SP November 5, 2025 17:36
@muhamadazmy
Copy link
Contributor Author

Dear @ADD-SP, thank you for your time! I’ve added a test case demonstrating how I would use it mainly to recover any buffered remainder, then restore the inner stream and continue consuming it without losing any items.

The example is a simplified version of the test, though I’m not sure if it’s simple enough for readers. Let me know what you think or if you have any suggestions to improve it

@muhamadazmy muhamadazmy force-pushed the pr7715 branch 2 times, most recently from 19d513b to 9a6783e Compare November 5, 2025 17:58
Copy link
Member

@ADD-SP ADD-SP left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw you changed the title, is this intended?

@muhamadazmy
Copy link
Contributor Author

@ADD-SP sorry about changing the title, that was indeed an accident. I use sapling which pushes title and description when submitting the PR. I will use the title you suggested.

@muhamadazmy muhamadazmy changed the title [ChunksTimeout] Consumes the stream reminder and return the items immediately stream: add ChunksTimeout::into_remainder Nov 6, 2025
@muhamadazmy muhamadazmy force-pushed the pr7715 branch 2 times, most recently from d689129 to b464cb5 Compare November 6, 2025 08:40
Summary:
When the underlying stream is an exclusive reference (&mut stream), and we need to drop the `ChunksTimout` stream without losing the buffered items.
@muhamadazmy
Copy link
Contributor Author

muhamadazmy commented Nov 6, 2025

Dear @ADD-SP thank you so much for your tips and pointers. I like the tests much more now with the paused-time. Great feature that I have overlooked 😄

Let me know if you still have any more comments.

@muhamadazmy muhamadazmy requested a review from ADD-SP November 6, 2025 08:44
@ADD-SP ADD-SP changed the title stream: add ChunksTimeout::into_remainder stream: add ChunksTimeout::into_remainder Nov 6, 2025
@ADD-SP ADD-SP removed the S-waiting-on-author Status: awaiting some action (such as code changes) from the PR or issue author. label Nov 6, 2025
Copy link
Member

@ADD-SP ADD-SP left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have considered two different interfaces for this usecase.

into_remainder

pub fn into_remainder(self: Pin<&mut Self>) -> Vec<T>;

Pros

Cons

  • The receiver is self: Pin<&mut Self> instead of self, which is a bit different from the std::slice::ChunksExactMut:into_remainder.
  • The downstream can keep using it after calling into_remainder(). The semantics look a bit strange.
     pinned.as_mut().into_remainder();
     pinned.as_mut().into_remainder();

take_remainder

pub fn take_remainder(self: &mut Pin<&mut Self>) -> Vec<T>;

Pros

  • The semantic looks more straightforward for downstream to keep using it after calling the take_remainder.
    let elems = pinned.take_remainder();
    let _ = pinned.poll_next(&mut cx);

Cons

  • The &mut Pin<&mut Self> is not a common pattern in Rust.

Why choose into_remainder?

Because of the use case. The use case of this interface is to retrieve buffered items after .chunked_timeout().await is canceled, and the downstream is no longer going to keep polling it. For example:

tokio::select! {
    Some(batch) = chunked.next() => todo!(),
    _ = tokio::signal::ctrl_c() => (),
}

let elems = chunked.into_remainder();

For this use case, into_remainder() is better than take_remainder() semantically, because the downstream is not going to poll the chunked again, so consuming it makes more sense.

@ADD-SP ADD-SP merged commit 62ecff8 into tokio-rs:master Nov 6, 2025
87 checks passed
@ADD-SP
Copy link
Member

ADD-SP commented Nov 6, 2025

Thanks!

@muhamadazmy muhamadazmy deleted the pr7715 branch November 6, 2025 12:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

A-tokio-stream Area: The tokio-stream crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants